#include "ConnectMeta.h"

#include "Socket.h"
 #include "HoComm.h"
#include <cerrno>
#include <thread>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h> // struct sockaddr_in
#include <string.h>
#include <sstream>



ConnectMeta::ConnectMeta(int epollFd, int connFd, FdState fds)
    : epfd(epollFd), actFd(connFd), evFlag(0), 
    fdState(fds), writePosition(0), 
    begTime(time(NULL)), endTime(0), sendBytes(0), recvBytes(0),
    splitReqFunc(NULL), onMessageReachFunc(NULL), onAcceptFunc(NULL), onCloseFunc(NULL),
    _onRemoveRefFunc(NULL), onTimerFunc(NULL), onConnectResultFunc(NULL)
{
    static atomic_int s_objId(0);
    objId = ++s_objId;
    printf("+ConnectMeta construct #%d\n", objId);
}

ConnectMeta::~ConnectMeta( void )
{
    if (INVALID_FD != actFd && evFlag)
    {
        setEvt(0);
    }

    IFCLOSEFD(actFd);
    printf("~ConnectMeta destroy #%d\n", objId);
}

// epoll_wait back, remove flag attached if EPOLLONESHOT enable
void ConnectMeta::oneShotUpdate( void )
{
    if (EPOLLONESHOT & evFlag)
    {
        evFlag = 0;
    }
}

// void ConnectMeta::setEPfd( int epfd )
// {
// 	epfd = epfd;
// }

int ConnectMeta::changeActFd( int actfd )
{
	int ret = 0;
	if (INVALID_FD != epfd && INVALID_FD != actFd)
	{
		if (actfd != actFd) // 先要清理调旧的actFd绑定
		{
			ret = setEvt(0);
		}
	}
	
	if (0 == ret)
	{
		actFd = actfd;
	}
	return ret;
}

int ConnectMeta::setKeepAlive(int idleSec)
{
    return Socket::setKeepAlive(actFd, idleSec, 10, 5);
}

int ConnectMeta::setSndBuffSize(int bytes)
{
    return Socket::setSndBuffSize(actFd, bytes);
}

int ConnectMeta::setRcvBuffSize(int bytes)
{
    return Socket::setRcvBuffSize(actFd, bytes);
}


int ConnectMeta::addEvt( int eventFg )
{
    return setEvt(eventFg|evFlag);
}

int ConnectMeta::rmEvt( int eventFg )
{
    return setEvt(evFlag&(~eventFg));
}

int ConnectMeta::setEvt( int eventFg )
{
    struct epoll_event epevt;
    const char* flowdesc = "";
    int ret = 0;

    if (INVALID_FD == actFd) return -14;

    //void* ptr = this;
    if (0 == evFlag)
    {
        if (eventFg)
        {
            epevt.events = eventFg;
            epevt.data.fd = actFd;

            ret = epoll_ctl(epfd, EPOLL_CTL_ADD, actFd, &epevt);
            flowdesc = "epoll-add";
        }
    }
    else // has set event before
    {
        if (0 == eventFg) // remove op
        {
            ret = epoll_ctl(epfd, EPOLL_CTL_DEL, actFd, NULL);
            //ptr = NULL;
            flowdesc = "epoll-del";
        }
        else if (eventFg != evFlag /*|| ptr != m_ptr */)
        {
            epevt.events = eventFg;
            epevt.data.fd = actFd;
            ret = epoll_ctl(epfd, EPOLL_CTL_MOD, actFd, &epevt);
            flowdesc = "epoll-mod";
        }
    }

    if (0 == ret)
    {
        evFlag = eventFg;
        //m_ptr = ptr;
    }
    else
    {
        errMsg += string(flowdesc) + "event fail evFlag=" + to_string(evFlag);
        //LOGERROR("HEPEVENT| msg=%s event fail| epfd=%d| sockfd=%d| ev=%X", flowdesc, epfd, actFd, eventFg);
    }

    return ret;
}

void ConnectMeta::start()
{
    switch (fdState)
    {
    case FDS_INITIAL_LISTEN:
        fdState.store(FDS_ACCEPTING, memory_order_release);
        setEvt(EPOLLIN|EPOLLET);
        break;
    case FDS_INITIAL_CONNECT:
        fdState.store(FDS_CONNECTING, memory_order_release);
    case FDS_CONNECTING:
        setEvt(EPOLLIN|EPOLLET|EPOLLOUT);
        break;
    case FDS_ESTABLISH:
        setEvt(EPOLLIN|EPOLLET);
        break;
    
    default:
        break;
    }
}

void ConnectMeta::stop(bool isCallCB)
{
    if (errMsg.empty())
    {
        errMsg = ACTIVE_STOP_REASON;
    }

    this->_stop(isCallCB);
}

void ConnectMeta::_stop(bool isCallCB)
{
    if (isCallCB && onCloseFunc)
    {
        onCloseFunc(actFd, errMsg);
    }

    if (_onRemoveRefFunc)
    {
        _onRemoveRefFunc(actFd, shared_from_this());
        _onRemoveRefFunc = NULL;
    }

    setEvt(0);
    IFCLOSEFD(actFd);
    if (0 == endTime)
    {
        endTime = time(NULL);
    }
}

string ConnectMeta::toString()
{
    stringstream sstm;
    int lifeSec = time(NULL) - begTime;

    if (!name.empty())
    {
        sstm << name << "=";
    }

    sstm << "fd:" << actFd << "/" << hex << evFlag << "h state:" << fdState << dec;

    if (endTime > 0)
    {
        lifeSec = endTime - begTime;
        sstm << " closed:" << (time(NULL) - endTime) << "s";
    }

    sstm << " life:" << lifeSec 
         << "s writing:" << writingMsg.size() << "+" << writeQueue.size()
         << " tx:" << sendBytes << " rx:" << recvBytes;
    
    return sstm.str();
}

void ConnectMeta::setName(const string& connName)
{
    name = connName;
}

int ConnectMeta::_recv()
{
    int ret = -13;
    auto preState = fdState.load(memory_order_consume);
    if (FDS_ESTABLISH == preState)
    {
        char buff[1024*8];
        bool loop;
        ret = 0;

        do
        {
            loop = false;
            ssize_t rdLen = ::recv(actFd, buff, sizeof(buff), MSG_DONTWAIT);
            if (rdLen > 0)
            {
                {
                    //std::lock_guard<std::mutex> lck(readMtx);
                    readingMsg.append(buff, rdLen);
                    splitReqFunc(readQueue, readingMsg, shared_from_this()); // 拆分请求包
                }
                
                loop = true;
                recvBytes += rdLen;
                ret += rdLen;

                while (!readQueue.empty())
                {
                    onMessageReachFunc(readQueue.front(), shared_from_this());
                    readQueue.pop();
                }
            }
            else if (0 == rdLen)
            {
                if (!readingMsg.empty())
                {
                    onMessageReachFunc(readingMsg, shared_from_this());
                    readingMsg.clear();
                }

                fdState.compare_exchange_weak(preState, FDS_CLOSE, memory_order_release, memory_order_relaxed);
            }
            else
            {
                if ( !(EAGAIN == errno || EWOULDBLOCK == errno) )
                {
                    fdState.compare_exchange_weak(preState, FDS_ERROR, memory_order_release, memory_order_relaxed);
                    errMsg += "read fail " + to_string(errno) + ";"; 
                    ret = -14;
                }
            }
        }
        while (loop);
    }

    return ret;
}

int ConnectMeta::_send()
{
    int ret = 0;
    bool loop = true;
    auto preState = fdState.load(memory_order_consume);

    if (FDS_ESTABLISH != preState)
    {
        LOGERROR("conn state not establish");
        return -16;
    }

    while (loop)
    {
        loop = false;
        if (writingMsg.empty())
        {
            writePosition = 0;
            std::lock_guard<std::mutex> lck(writeMtx);
            if (!writeQueue.empty())
            {
                writingMsg = std::move(writeQueue.front());
                writeQueue.pop();
                writePosition = 0;
            }
            else
            {
                rmEvt(EPOLLOUT);
                break; // 没有要发送的消息
            }
        }

        size_t needLen = writingMsg.size() - writePosition;
        ssize_t nwrite = ::send(actFd, writingMsg.data()+writePosition, needLen, MSG_DONTWAIT);
        if (nwrite < 0)
        {
            int eno = errno;
            if (EAGAIN != eno && EWOULDBLOCK != eno)
            {
                fdState.compare_exchange_weak(preState, FDS_ERROR, memory_order_release);
                errMsg += "send fail ";
                errMsg += std::to_string(eno) + " " + strerror(eno) + ";";
                ret = -15;
            }
        }
        else
        {
            writePosition += nwrite;
            sendBytes += nwrite;
            ret += nwrite;

            //const char* chtmp = writingMsg.data();
            // printf("-- sendlast %d/%d fd=%d\n", 
            //     (int)nwrite, (int)needLen, actFd);
            
            if (needLen == (size_t)nwrite) // 发送完清除buff
            {
                writingMsg.clear();
                writePosition = 0;
                loop = true;
            }

        }
    }

    return ret;
}

// remark: call by epoll_wait() thread (i/o thread)
int ConnectMeta::onEpollEvent(int events)
{
    oneShotUpdate();
    bool errHappen = (EPOLLERR & events) || (EPOLLHUP & events);

    if (EPOLLIN & events) // input reach
    {
        if (FDS_CONNECTING == fdState.load(memory_order_consume))
        {
            fdState.store(errHappen? FDS_ERROR : FDS_ESTABLISH, memory_order_release);
            if (onConnectResultFunc)
            {
                onConnectResultFunc(!errHappen, shared_from_this());
            }

            if (!writingMsg.empty() && !errHappen)
            {
                addEvt(EPOLLIN | EPOLLOUT | EPOLLET);
            }
        }

        if (FDS_ESTABLISH == fdState.load(memory_order_consume))
        {
            _recv();
        }
        else if (FDS_ACCEPTING == fdState.load(memory_order_consume)) // 监听socket
        {
            struct sockaddr_in addr;
		    socklen_t len = sizeof(addr);
            int nClient;
            while ( (nClient = ::accept(actFd, (struct sockaddr*)&addr, &len)) >= 0 )
            {
                onAcceptFunc(nClient, shared_from_this());
            }

            if ( !(EAGAIN == errno || EWOULDBLOCK == errno) )
            {
                fdState = FDS_ERROR;
                errMsg += "accept fail " + to_string(errno) + ";";
            }
        }
        else if (FDS_TIMERFD == fdState.load(memory_order_consume))
        {
            onTimerFunc();
        }
        
    }

    if (EPOLLOUT & events)
    {
        if (FDS_CONNECTING == fdState.load(memory_order_consume))
        {
            fdState.store(errHappen? FDS_ERROR : FDS_ESTABLISH, memory_order_release);
            if (onConnectResultFunc)
            {
                onConnectResultFunc(!errHappen, shared_from_this());
            }

            if (!writingMsg.empty() && !errHappen)
            {
                addEvt(EPOLLIN | EPOLLOUT | EPOLLET);
            }
        }
        
        /*int retSnd = */ _send();
        //LOGDEBUG("_send() -> %d", retSnd);
    }

    if ( errHappen )
    {
        fdState.store(FDS_ERROR, memory_order_release);
        errMsg += "error happen;";
    }

    if (fdState.load(memory_order_consume) >= FDS_CLOSE)
    {
        _stop(true);
    }

    return 0;
}

int ConnectMeta::sendMsg(const string& msg)
{
    int ret = 0;
    auto curState = fdState.load(memory_order_consume);
    if (FDS_CONNECTING == curState || FDS_ESTABLISH == curState || FDS_INITIAL_CONNECT == curState)
    {
        std::lock_guard<std::mutex> lck(writeMtx);
        writeQueue.push(msg);
        addEvt(EPOLLOUT);
    }
    else
    {
        LOGERROR("SENDMSG| msg=connect not establish| fd=%d| fdState=%d", actFd, (int)curState);
        ret = -10;
    }

    return ret;
}

ConnectMeta* ConnectMeta::cloneNew( int cliFd, FdState fds )
{
    ConnectMeta* ptr = new ConnectMeta(epfd, cliFd);

    ptr->splitReqFunc = this->splitReqFunc;
    ptr->onMessageReachFunc = this->onMessageReachFunc;
    ptr->onAcceptFunc = this->onAcceptFunc;
    ptr->onCloseFunc = this->onCloseFunc;
    ptr->_onRemoveRefFunc = this->_onRemoveRefFunc;
    ptr->fdState.store(fds, memory_order_release);

    return ptr;
}